/** Copyright 2020-2023 Alibaba Group Holding Limited.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#ifndef MODULES_GRAPH_FRAGMENT_ARROW_FRAGMENT_MOD_H_
#define MODULES_GRAPH_FRAGMENT_ARROW_FRAGMENT_MOD_H_

#include <cstddef>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <utility>
#include <vector>

#include "arrow/api.h"
#include "arrow/io/api.h"

#include "grape/fragment/fragment_base.h"
#include "grape/graph/adj_list.h"
#include "grape/utils/vertex_array.h"

#include "client/ds/core_types.h"
#include "client/ds/object_meta.h"

#include "basic/ds/arrow.h"
#include "basic/ds/arrow_utils.h"
#include "common/util/typename.h"

#include "graph/fragment/arrow_fragment_base.h"
#include "graph/fragment/fragment_traits.h"
#include "graph/fragment/graph_schema.h"
#include "graph/fragment/property_graph_types.h"
#include "graph/utils/error.h"
#include "graph/vertex_map/arrow_local_vertex_map.h"
#include "graph/vertex_map/arrow_vertex_map.h"

namespace gs {

template <typename OID_T, typename VID_T, typename VDATA_T, typename EDATA_T,
          typename VERTEX_MAP_T>
class ArrowProjectedFragment;

}  // namespace gs

namespace vineyard {

inline std::string generate_name_with_suffix(
    const std::string& prefix, property_graph_types::LABEL_ID_TYPE label) {
  return prefix + "_" + std::to_string(label);
}

inline std::string generate_name_with_suffix(
    const std::string& prefix, property_graph_types::LABEL_ID_TYPE v_label,
    property_graph_types::LABEL_ID_TYPE e_label) {
  return prefix + "_" + std::to_string(v_label) + "_" + std::to_string(e_label);
}

template <typename OID_T, typename VID_T, typename VERTEX_MAP_T>
class ArrowFragmentBaseBuilder;

template <typename OID_T, typename VID_T,
          typename VERTEX_MAP_T =
              ArrowVertexMap<typename InternalType<OID_T>::type, VID_T>>
class [[vineyard]] ArrowFragment
    : public ArrowFragmentBase,
      public vineyard::BareRegistered<
          ArrowFragment<OID_T, VID_T, VERTEX_MAP_T>> {
 public:
  using oid_t = OID_T;
  using vid_t = VID_T;
  using internal_oid_t = typename InternalType<oid_t>::type;
  using eid_t = property_graph_types::EID_TYPE;
  using prop_id_t = property_graph_types::PROP_ID_TYPE;
  using label_id_t = property_graph_types::LABEL_ID_TYPE;
  using vertex_range_t = grape::VertexRange<vid_t>;
  using inner_vertices_t = vertex_range_t;
  using outer_vertices_t = vertex_range_t;
  using vertices_t = vertex_range_t;
  using nbr_t = property_graph_utils::Nbr<vid_t, eid_t>;
  using nbr_unit_t = property_graph_utils::NbrUnit<vid_t, eid_t>;
  using adj_list_t = property_graph_utils::AdjList<vid_t, eid_t>;
  using raw_adj_list_t = property_graph_utils::RawAdjList<vid_t, eid_t>;
  using vertex_map_t = VERTEX_MAP_T;
  using vertex_t = grape::Vertex<vid_t>;

  using ovg2l_map_t =
      ska::flat_hash_map<vid_t, vid_t, typename Hashmap<vid_t, vid_t>::KeyHash>;

  using vid_array_t = ArrowArrayType<vid_t>;
  using vid_vineyard_array_t = ArrowVineyardArrayType<vid_t>;
  using vid_vineyard_builder_t = ArrowVineyardBuilderType<vid_t>;
  using eid_array_t = ArrowArrayType<eid_t>;
  using eid_vineyard_array_t = ArrowVineyardArrayType<eid_t>;
  using eid_vineyard_builder_t = ArrowVineyardBuilderType<eid_t>;

  using vid_builder_t = ArrowBuilderType<vid_t>;

  template <typename DATA_T>
  using vertex_array_t = grape::VertexArray<vertices_t, DATA_T>;

  template <typename DATA_T>
  using inner_vertex_array_t = grape::VertexArray<inner_vertices_t, DATA_T>;

  template <typename DATA_T>
  using outer_vertex_array_t = grape::VertexArray<outer_vertices_t, DATA_T>;

  static constexpr grape::LoadStrategy load_strategy =
      grape::LoadStrategy::kBothOutIn;

 public:
  ~ArrowFragment() = default;

  vineyard::ObjectID vertex_map_id() const override { return vm_ptr_->id(); }

  bool directed() const override { return directed_; }

  bool is_multigraph() const override { return is_multigraph_; }

  const std::string vid_typename() const override { return vid_type; }

  const std::string oid_typename() const override { return oid_type; }

  void PostConstruct(const vineyard::ObjectMeta& meta) override {
    vid_parser_.Init(fnum_, vertex_label_num_);
    this->schema_.FromJSON(schema_json_);

    // init pointers for arrays and tables
    initPointers();

    // init edge numbers
    oenum_ = 0;
    ienum_ = 0;
    for (label_id_t i = 0; i < vertex_label_num_; i++) {
      for (auto& v : InnerVertices(i)) {
        for (label_id_t j = 0; j < edge_label_num_; j++) {
          oenum_ += GetLocalOutDegree(v, j);
          ienum_ += GetLocalInDegree(v, j);
        }
      }
    }
  }

  fid_t fid() const { return fid_; }

  fid_t fnum() const { return fnum_; }

  label_id_t vertex_label(const vertex_t& v) const {
    return vid_parser_.GetLabelId(v.GetValue());
  }

  int64_t vertex_offset(const vertex_t& v) const {
    return vid_parser_.GetOffset(v.GetValue());
  }

  label_id_t vertex_label_num() const { return schema_.vertex_label_num(); }

  label_id_t edge_label_num() const { return schema_.edge_label_num(); }

  prop_id_t vertex_property_num(label_id_t label) const {
    std::string type = "VERTEX";
    return static_cast<prop_id_t>(schema_.GetEntry(label, type).property_num());
  }

  std::shared_ptr<arrow::DataType> vertex_property_type(label_id_t label,
                                                        prop_id_t prop) const {
    return vertex_tables_[label]->schema()->field(prop)->type();
  }

  prop_id_t edge_property_num(label_id_t label) const {
    std::string type = "EDGE";
    return static_cast<prop_id_t>(schema_.GetEntry(label, type).property_num());
  }

  std::shared_ptr<arrow::DataType> edge_property_type(label_id_t label,
                                                      prop_id_t prop) const {
    return edge_tables_[label]->schema()->field(prop)->type();
  }

  std::shared_ptr<arrow::Table> vertex_data_table(label_id_t i) const {
    return vertex_tables_[i]->GetTable();
  }

  std::shared_ptr<arrow::Table> edge_data_table(label_id_t i) const {
    return edge_tables_[i]->GetTable();
  }

  template <typename DATA_T>
  property_graph_utils::EdgeDataColumn<DATA_T, nbr_unit_t> edge_data_column(
      label_id_t label, prop_id_t prop) const {
    if (edge_tables_[label]->num_rows() == 0) {
      return property_graph_utils::EdgeDataColumn<DATA_T, nbr_unit_t>();
    } else {
      // the finalized etables are guaranteed to have been concatenated
      return property_graph_utils::EdgeDataColumn<DATA_T, nbr_unit_t>(
          edge_tables_[label]->column(prop)->chunk(0));
    }
  }

  template <typename DATA_T>
  property_graph_utils::VertexDataColumn<DATA_T, vid_t> vertex_data_column(
      label_id_t label, prop_id_t prop) const {
    if (vertex_tables_[label]->num_rows() == 0) {
      return property_graph_utils::VertexDataColumn<DATA_T, vid_t>(
          InnerVertices(label));
    } else {
      // the finalized vtables are guaranteed to have been concatenated
      return property_graph_utils::VertexDataColumn<DATA_T, vid_t>(
          InnerVertices(label), vertex_tables_[label]->column(prop)->chunk(0));
    }
  }

  vertex_range_t Vertices(label_id_t label_id) const {
    return vertex_range_t(
        vid_parser_.GenerateId(0, label_id, 0),
        vid_parser_.GenerateId(0, label_id, tvnums_[label_id]));
  }

  vertex_range_t InnerVertices(label_id_t label_id) const {
    return vertex_range_t(
        vid_parser_.GenerateId(0, label_id, 0),
        vid_parser_.GenerateId(0, label_id, ivnums_[label_id]));
  }

  vertex_range_t OuterVertices(label_id_t label_id) const {
    return vertex_range_t(
        vid_parser_.GenerateId(0, label_id, ivnums_[label_id]),
        vid_parser_.GenerateId(0, label_id, tvnums_[label_id]));
  }

  vertex_range_t InnerVerticesSlice(label_id_t label_id, vid_t start, vid_t end)
      const {
    CHECK(start <= end && start <= ivnums_[label_id]);
    if (end <= ivnums_[label_id]) {
      return vertex_range_t(vid_parser_.GenerateId(0, label_id, start),
                            vid_parser_.GenerateId(0, label_id, end));
    } else {
      return vertex_range_t(
          vid_parser_.GenerateId(0, label_id, start),
          vid_parser_.GenerateId(0, label_id, ivnums_[label_id]));
    }
  }

  inline vid_t GetVerticesNum(label_id_t label_id) const {
    return tvnums_[label_id];
  }

  bool GetVertex(label_id_t label, const oid_t& oid, vertex_t& v) const {
    vid_t gid;
    if (vm_ptr_->GetGid(label, internal_oid_t(oid), gid)) {
      return (vid_parser_.GetFid(gid) == fid_) ? InnerVertexGid2Vertex(gid, v)
                                               : OuterVertexGid2Vertex(gid, v);
    } else {
      return false;
    }
  }

  oid_t GetId(const vertex_t& v) const {
    return IsInnerVertex(v) ? GetInnerVertexId(v) : GetOuterVertexId(v);
  }

  internal_oid_t GetInternalId(const vertex_t& v) const {
    return IsInnerVertex(v) ? GetInnerVertexInternalId(v)
                            : GetOuterVertexInternalId(v);
  }

  fid_t GetFragId(const vertex_t& u) const {
    return IsInnerVertex(u) ? fid_ : vid_parser_.GetFid(GetOuterVertexGid(u));
  }

  size_t GetTotalNodesNum() const { return vm_ptr_->GetTotalNodesNum(); }
  size_t GetTotalVerticesNum() const { return vm_ptr_->GetTotalNodesNum(); }
  size_t GetTotalVerticesNum(label_id_t label) const {
    return vm_ptr_->GetTotalNodesNum(label);
  }

  size_t GetEdgeNum() const { return directed_ ? oenum_ + ienum_ : oenum_; }

  size_t GetInEdgeNum() const { return ienum_; }

  size_t GetOutEdgeNum() const { return oenum_; }

  template <typename T>
  T GetData(const vertex_t& v, prop_id_t prop_id) const {
    return property_graph_utils::ValueGetter<T>::Value(
        vertex_tables_columns_[vid_parser_.GetLabelId(v.GetValue())][prop_id],
        vid_parser_.GetOffset(v.GetValue()));
  }

  bool HasChild(const vertex_t& v, label_id_t e_label) const {
    return GetLocalOutDegree(v, e_label) != 0;
  }

  bool HasParent(const vertex_t& v, label_id_t e_label) const {
    return GetLocalInDegree(v, e_label) != 0;
  }

  int GetLocalOutDegree(const vertex_t& v, label_id_t e_label) const {
    return GetOutgoingAdjList(v, e_label).Size();
  }

  int GetLocalInDegree(const vertex_t& v, label_id_t e_label) const {
    return GetIncomingAdjList(v, e_label).Size();
  }

  // FIXME: grape message buffer compatibility
  bool Gid2Vertex(const vid_t& gid, vertex_t& v) const {
    return (vid_parser_.GetFid(gid) == fid_) ? InnerVertexGid2Vertex(gid, v)
                                             : OuterVertexGid2Vertex(gid, v);
  }

  vid_t Vertex2Gid(const vertex_t& v) const {
    return IsInnerVertex(v) ? GetInnerVertexGid(v) : GetOuterVertexGid(v);
  }

  inline vid_t GetInnerVerticesNum(label_id_t label_id) const {
    return ivnums_[label_id];
  }

  inline vid_t GetOuterVerticesNum(label_id_t label_id) const {
    return ovnums_[label_id];
  }

  inline bool IsInnerVertex(const vertex_t& v) const {
    return vid_parser_.GetOffset(v.GetValue()) <
           static_cast<int64_t>(ivnums_[vid_parser_.GetLabelId(v.GetValue())]);
  }

  inline bool IsOuterVertex(const vertex_t& v) const {
    vid_t offset = vid_parser_.GetOffset(v.GetValue());
    label_id_t label = vid_parser_.GetLabelId(v.GetValue());
    return offset < tvnums_[label] && offset >= ivnums_[label];
  }

  bool GetInnerVertex(label_id_t label, const oid_t& oid, vertex_t& v) const {
    vid_t gid;
    if (vm_ptr_->GetGid(label, internal_oid_t(oid), gid)) {
      if (vid_parser_.GetFid(gid) == fid_) {
        v.SetValue(vid_parser_.GetLid(gid));
        return true;
      }
    }
    return false;
  }

  bool GetOuterVertex(label_id_t label, const oid_t& oid, vertex_t& v) const {
    vid_t gid;
    if (vm_ptr_->GetGid(label, internal_oid_t(oid), gid)) {
      return OuterVertexGid2Vertex(gid, v);
    }
    return false;
  }

  inline oid_t GetInnerVertexId(const vertex_t& v) const {
    return oid_t(GetInnerVertexInternalId(v));
  }

  inline internal_oid_t GetInnerVertexInternalId(const vertex_t& v) const {
    internal_oid_t internal_oid;
    vid_t gid =
        vid_parser_.GenerateId(fid_, vid_parser_.GetLabelId(v.GetValue()),
                               vid_parser_.GetOffset(v.GetValue()));
    CHECK(vm_ptr_->GetOid(gid, internal_oid));
    return internal_oid;
  }

  inline oid_t GetOuterVertexId(const vertex_t& v) const {
    return oid_t(GetOuterVertexInternalId(v));
  }

  inline internal_oid_t GetOuterVertexInternalId(const vertex_t& v) const {
    vid_t gid = GetOuterVertexGid(v);
    internal_oid_t internal_oid;
    CHECK(vm_ptr_->GetOid(gid, internal_oid));
    return internal_oid;
  }

  inline oid_t Gid2Oid(const vid_t& gid) const {
    internal_oid_t internal_oid;
    CHECK(vm_ptr_->GetOid(gid, internal_oid));
    return oid_t(internal_oid);
  }

  inline bool Oid2Gid(label_id_t label, const oid_t& oid, vid_t& gid) const {
    return vm_ptr_->GetGid(label, internal_oid_t(oid), gid);
  }

  inline bool Oid2Gid(label_id_t label, const oid_t& oid, vertex_t& v) const {
    vid_t gid;
    if (vm_ptr_->GetGid(label, internal_oid_t(oid), gid)) {
      v.SetValue(gid);
      return true;
    }
    return false;
  }

  inline bool InnerVertexGid2Vertex(const vid_t& gid, vertex_t& v) const {
    v.SetValue(vid_parser_.GetLid(gid));
    return true;
  }

  inline bool OuterVertexGid2Vertex(const vid_t& gid, vertex_t& v) const {
    auto map = ovg2l_maps_ptr_[vid_parser_.GetLabelId(gid)];
    auto iter = map->find(gid);
    if (iter != map->end()) {
      v.SetValue(iter->second);
      return true;
    } else {
      return false;
    }
  }

  inline vid_t GetOuterVertexGid(const vertex_t& v) const {
    label_id_t v_label = vid_parser_.GetLabelId(v.GetValue());
    return ovgid_lists_ptr_[v_label][vid_parser_.GetOffset(v.GetValue()) -
                                     static_cast<int64_t>(ivnums_[v_label])];
  }
  inline vid_t GetInnerVertexGid(const vertex_t& v) const {
    return vid_parser_.GenerateId(fid_, vid_parser_.GetLabelId(v.GetValue()),
                                  vid_parser_.GetOffset(v.GetValue()));
  }

  inline adj_list_t GetIncomingAdjList(const vertex_t& v, label_id_t e_label)
      const {
    vid_t vid = v.GetValue();
    label_id_t v_label = vid_parser_.GetLabelId(vid);
    int64_t v_offset = vid_parser_.GetOffset(vid);
    const int64_t* offset_array = ie_offsets_ptr_lists_[v_label][e_label];
    const nbr_unit_t* ie = ie_ptr_lists_[v_label][e_label];
    return adj_list_t(&ie[offset_array[v_offset]],
                      &ie[offset_array[v_offset + 1]],
                      flatten_edge_tables_columns_[e_label]);
  }

  inline raw_adj_list_t GetIncomingRawAdjList(const vertex_t& v,
                                              label_id_t e_label) const {
    vid_t vid = v.GetValue();
    label_id_t v_label = vid_parser_.GetLabelId(vid);
    int64_t v_offset = vid_parser_.GetOffset(vid);
    const int64_t* offset_array = ie_offsets_ptr_lists_[v_label][e_label];
    const nbr_unit_t* ie = ie_ptr_lists_[v_label][e_label];
    return raw_adj_list_t(&ie[offset_array[v_offset]],
                          &ie[offset_array[v_offset + 1]]);
  }

  inline adj_list_t GetOutgoingAdjList(const vertex_t& v, label_id_t e_label)
      const {
    vid_t vid = v.GetValue();
    label_id_t v_label = vid_parser_.GetLabelId(vid);
    int64_t v_offset = vid_parser_.GetOffset(vid);
    const int64_t* offset_array = oe_offsets_ptr_lists_[v_label][e_label];
    const nbr_unit_t* oe = oe_ptr_lists_[v_label][e_label];
    return adj_list_t(&oe[offset_array[v_offset]],
                      &oe[offset_array[v_offset + 1]],
                      flatten_edge_tables_columns_[e_label]);
  }

  inline raw_adj_list_t GetOutgoingRawAdjList(const vertex_t& v,
                                              label_id_t e_label) const {
    vid_t vid = v.GetValue();
    label_id_t v_label = vid_parser_.GetLabelId(vid);
    int64_t v_offset = vid_parser_.GetOffset(vid);
    const int64_t* offset_array = oe_offsets_ptr_lists_[v_label][e_label];
    const nbr_unit_t* oe = oe_ptr_lists_[v_label][e_label];
    return raw_adj_list_t(&oe[offset_array[v_offset]],
                          &oe[offset_array[v_offset + 1]]);
  }

  /**
   * N.B.: as an temporary solution, for POC of graph-learn, will be removed
   * later.
   */

  inline const int64_t* GetIncomingOffsetArray(label_id_t v_label,
                                               label_id_t e_label) const {
    return ie_offsets_ptr_lists_[v_label][e_label];
  }

  inline const int64_t* GetOutgoingOffsetArray(label_id_t v_label,
                                               label_id_t e_label) const {
    return oe_offsets_ptr_lists_[v_label][e_label];
  }

  inline int64_t GetIncomingOffsetLength(label_id_t v_label, label_id_t e_label)
      const {
    return ie_offsets_lists_[v_label][e_label]->length();
  }

  inline int64_t GetOutgoingOffsetLength(label_id_t v_label, label_id_t e_label)
      const {
    return oe_offsets_lists_[v_label][e_label]->length();
  }

  inline std::pair<int64_t, int64_t> GetOutgoingAdjOffsets(
      const vertex_t& v, label_id_t e_label) const {
    vid_t vid = v.GetValue();
    label_id_t v_label = vid_parser_.GetLabelId(vid);
    int64_t v_offset = vid_parser_.GetOffset(vid);
    const int64_t* offset_array = oe_offsets_ptr_lists_[v_label][e_label];
    return std::make_pair(offset_array[v_offset], offset_array[v_offset + 1]);
  }

  inline grape::DestList IEDests(const vertex_t& v, label_id_t e_label) const {
    int64_t offset = vid_parser_.GetOffset(v.GetValue());
    auto v_label = vertex_label(v);

    return grape::DestList(idoffset_[v_label][e_label][offset],
                           idoffset_[v_label][e_label][offset + 1]);
  }

  inline grape::DestList OEDests(const vertex_t& v, label_id_t e_label) const {
    int64_t offset = vid_parser_.GetOffset(v.GetValue());
    auto v_label = vertex_label(v);

    return grape::DestList(odoffset_[v_label][e_label][offset],
                           odoffset_[v_label][e_label][offset + 1]);
  }

  inline grape::DestList IOEDests(const vertex_t& v, label_id_t e_label) const {
    int64_t offset = vid_parser_.GetOffset(v.GetValue());
    auto v_label = vertex_label(v);

    return grape::DestList(iodoffset_[v_label][e_label][offset],
                           iodoffset_[v_label][e_label][offset + 1]);
  }

  std::shared_ptr<vertex_map_t> GetVertexMap() { return vm_ptr_; }

  const PropertyGraphSchema& schema() const override { return schema_; }

  void PrepareToRunApp(const grape::CommSpec& comm_spec,
                       grape::PrepareConf conf);

  boost::leaf::result<ObjectID> AddVerticesAndEdges(
      Client & client,
      std::map<label_id_t, std::shared_ptr<arrow::Table>> && vertex_tables_map,
      std::map<label_id_t, std::shared_ptr<arrow::Table>> && edge_tables_map,
      ObjectID vm_id,
      const std::vector<std::set<std::pair<std::string, std::string>>>&
          edge_relations,
      int concurrency);

  boost::leaf::result<ObjectID> AddVertices(
      Client & client,
      std::map<label_id_t, std::shared_ptr<arrow::Table>> && vertex_tables_map,
      ObjectID vm_id);

  boost::leaf::result<ObjectID> AddEdges(
      Client & client,
      std::map<label_id_t, std::shared_ptr<arrow::Table>> && edge_tables_map,
      const std::vector<std::set<std::pair<std::string, std::string>>>&
          edge_relations,
      int concurrency);

  /// Add a set of new vertex labels and a set of new edge labels to graph.
  /// Vertex label id started from vertex_label_num_, and edge label id
  /// started from edge_label_num_.
  boost::leaf::result<ObjectID> AddNewVertexEdgeLabels(
      Client & client,
      std::vector<std::shared_ptr<arrow::Table>> && vertex_tables,
      std::vector<std::shared_ptr<arrow::Table>> && edge_tables, ObjectID vm_id,
      const std::vector<std::set<std::pair<std::string, std::string>>>&
          edge_relations,
      int concurrency);

  /// Add a set of new vertex labels to graph. Vertex label id started from
  /// vertex_label_num_.
  boost::leaf::result<ObjectID> AddNewVertexLabels(
      Client & client,
      std::vector<std::shared_ptr<arrow::Table>> && vertex_tables,
      ObjectID vm_id);

  /// Add a set of new edge labels to graph. Edge label id started from
  /// edge_label_num_.
  boost::leaf::result<ObjectID> AddNewEdgeLabels(
      Client & client,
      std::vector<std::shared_ptr<arrow::Table>> && edge_tables,
      const std::vector<std::set<std::pair<std::string, std::string>>>&
          edge_relations,
      int concurrency);

  boost::leaf::result<vineyard::ObjectID> AddVertexColumns(
      vineyard::Client & client,
      const std::map<
          label_id_t,
          std::vector<std::pair<std::string, std::shared_ptr<arrow::Array>>>>
          columns,
      bool replace = false) override;

  boost::leaf::result<vineyard::ObjectID> AddVertexColumns(
      vineyard::Client & client,
      const std::map<label_id_t,
                     std::vector<std::pair<
                         std::string, std::shared_ptr<arrow::ChunkedArray>>>>
          columns,
      bool replace = false) override;

  template <typename ArrayType = arrow::Array>
  boost::leaf::result<vineyard::ObjectID> AddVertexColumnsImpl(
      vineyard::Client & client,
      const std::map<
          label_id_t,
          std::vector<std::pair<std::string, std::shared_ptr<ArrayType>>>>
          columns,
      bool replace = false);

  boost::leaf::result<vineyard::ObjectID> AddEdgeColumns(
      vineyard::Client & client,
      const std::map<
          label_id_t,
          std::vector<std::pair<std::string, std::shared_ptr<arrow::Array>>>>
          columns,
      bool replace = false) override;

  boost::leaf::result<vineyard::ObjectID> AddEdgeColumns(
      vineyard::Client & client,
      const std::map<label_id_t,
                     std::vector<std::pair<
                         std::string, std::shared_ptr<arrow::ChunkedArray>>>>
          columns,
      bool replace = false) override;

  template <typename ArrayType = arrow::Array>
  boost::leaf::result<vineyard::ObjectID> AddEdgeColumnsImpl(
      vineyard::Client & client,
      const std::map<
          label_id_t,
          std::vector<std::pair<std::string, std::shared_ptr<ArrayType>>>>
          columns,
      bool replace = false);

  boost::leaf::result<vineyard::ObjectID> Project(
      vineyard::Client & client,
      std::map<label_id_t, std::vector<label_id_t>> vertices,
      std::map<label_id_t, std::vector<label_id_t>> edges);

  boost::leaf::result<vineyard::ObjectID> TransformDirection(
      vineyard::Client & client, int concurrency);

  boost::leaf::result<vineyard::ObjectID> ConsolidateVertexColumns(
      vineyard::Client & client, const label_id_t vlabel,
      std::vector<std::string> const& prop_names,
      std::string const& consolidate_name);

  boost::leaf::result<vineyard::ObjectID> ConsolidateVertexColumns(
      vineyard::Client & client, const label_id_t vlabel,
      std::vector<prop_id_t> const& props, std::string const& consolidate_name);

  boost::leaf::result<vineyard::ObjectID> ConsolidateEdgeColumns(
      vineyard::Client & client, const label_id_t elabel,
      std::vector<std::string> const& prop_names,
      std::string const& consolidate_name);

  boost::leaf::result<vineyard::ObjectID> ConsolidateEdgeColumns(
      vineyard::Client & client, const label_id_t elabel,
      std::vector<prop_id_t> const& props, std::string const& consolidate_name);

 private:
  void initPointers();

  void initDestFidList(
      bool in_edge, bool out_edge,
      std::vector<std::vector<std::vector<fid_t>>>& fid_lists,
      std::vector<std::vector<std::vector<fid_t*>>>& fid_lists_offset);

  void directedCSR2Undirected(
      vineyard::Client & client,
      std::vector<std::vector<std::shared_ptr<PodArrayBuilder<nbr_unit_t>>>> &
          oe_lists,
      std::vector<std::vector<std::shared_ptr<FixedInt64Builder>>> &
          oe_offsets_lists,
      int concurrency, bool& is_multigraph);

  [[shared]] fid_t fid_, fnum_;
  [[shared]] bool directed_;
  [[shared]] bool is_multigraph_;
  [[shared]] property_graph_types::LABEL_ID_TYPE vertex_label_num_;
  [[shared]] property_graph_types::LABEL_ID_TYPE edge_label_num_;
  size_t oenum_, ienum_;  // FIXME: should be pre-computable

  [[shared]] String oid_type, vid_type;

  [[shared]] vineyard::Array<vid_t> ivnums_, ovnums_, tvnums_;

  [[shared]] List<std::shared_ptr<Table>> vertex_tables_;
  std::vector<std::vector<const void*>> vertex_tables_columns_;

  [[shared]] List<std::shared_ptr<vid_vineyard_array_t>> ovgid_lists_;
  std::vector<const vid_t*> ovgid_lists_ptr_;

  [[shared]] List<std::shared_ptr<vineyard::Hashmap<vid_t, vid_t>>> ovg2l_maps_;
  std::vector<vineyard::Hashmap<vid_t, vid_t>*> ovg2l_maps_ptr_;

  [[shared]] List<std::shared_ptr<Table>> edge_tables_;
  std::vector<std::vector<const void*>> edge_tables_columns_;
  std::vector<const void**> flatten_edge_tables_columns_;

  [[shared]] List<List<std::shared_ptr<FixedSizeBinaryArray>>> ie_lists_,
      oe_lists_;
  std::vector<std::vector<const nbr_unit_t*>> ie_ptr_lists_, oe_ptr_lists_;
  [[shared]] List<List<std::shared_ptr<Int64Array>>> ie_offsets_lists_,
      oe_offsets_lists_;
  std::vector<std::vector<const int64_t*>> ie_offsets_ptr_lists_,
      oe_offsets_ptr_lists_;

  std::vector<std::vector<std::vector<fid_t>>> idst_, odst_, iodst_;
  std::vector<std::vector<std::vector<fid_t*>>> idoffset_, odoffset_,
      iodoffset_;

  [[shared]] std::shared_ptr<vertex_map_t> vm_ptr_;

  vineyard::IdParser<vid_t> vid_parser_;

  [[shared]] json schema_json_;
  PropertyGraphSchema schema_;

  friend class ArrowFragmentBaseBuilder<OID_T, VID_T, VERTEX_MAP_T>;

  template <typename _OID_T, typename _VID_T, typename VDATA_T,
            typename EDATA_T, typename _VERTEX_MAP_T>
  friend class gs::ArrowProjectedFragment;
};

}  // namespace vineyard

#endif  // MODULES_GRAPH_FRAGMENT_ARROW_FRAGMENT_MOD_H_

// vim: syntax=cpp
